package ctrl

import (
	"net/http"
	"strconv"
	"github.com/gorilla/websocket"
	"log"
	"gopkg.in/fatih/set.v0"
	"sync"
	"fmt"
	"encoding/json"
	"net"
	"github.com/streadway/amqp"
	"../common"
)

//核心在于形成userid和Node的映射关系
type Node struct {
	Conn      *websocket.Conn //保存websocket连接,conn是io型的资源
	DataQueue chan []byte     //并行的数据转成串行的数据
	GroupSets set.Interface   //组,线程安全
}

//映射关系表(map的键是userid,值是Node,全局的map,所有协程共享)
var clientMap map[int64]*Node = make(map[int64]*Node)

//读写锁
var rwlocker sync.RWMutex

//接收到通过websocket传过来的消息结构体
type Message struct {
	Id int64 `json:"id,omitempty" form:"id"` //消息ID
	//谁发的
	Userid interface{} `json:"userid,omitempty" form:"userid"` //谁发的
	//什么业务
	Cmd int `json:"cmd,omitempty" form:"cmd"` //群聊还是私聊
	//发给谁
	Dstid int64 `json:"dstid,omitempty" form:"dstid"` //对端用户ID/群ID
	//怎么展示
	Media int `json:"media,omitempty" form:"media"` //消息按照什么样式展示
	//内容是什么
	Content string `json:"content,omitempty" form:"content"` //消息的内容
	//图片是什么
	Pic string `json:"pic,omitempty" form:"pic"` //预览图片
	//连接是什么
	Url string `json:"url,omitempty" form:"url"` //服务的URL
	//简单描述
	Memo string `json:"memo,omitempty" form:"memo"` //简单描述
	//其他的附加数据，语音长度/红包金额
	Amount int `json:"amount,omitempty" form:"amount"` //其他和数字相关的
}

//定义常量,消息类型是单聊,还是群聊,还是心跳
const (
	//点对点单聊,dstid是用户ID
	CMD_SINGLE_MSG = 10
	//群聊消息,dstid是群id
	CMD_ROOM_MSG = 11
	//心跳消息,不处理
	CMD_HEART = 0
)

//这个channel专门来存储udp广播的数据
var udpsendchan chan []byte = make(chan []byte, 2014)

//这个channel专门来存储rabbitmq广播的数据
var mqsendchan chan []byte = make(chan []byte, 2014)

//ws://127.0.0.1/chat?id=1&token=xxx
func Chat(writer http.ResponseWriter, request *http.Request) {

	//todo检验token是否合法
	query := request.URL.Query() //获取get后面携带的参数,得到的都是字符串
	id := query.Get("id")
	token := query.Get("token")

	//将id转换为int型
	userId, _ := strconv.ParseInt(id, 10, 64)

	//返回验证结果(验证参数正确性,websocket自带验证,所以我们就用websocket自带的)
	isvalida := checkToken(userId, token)

	//将当前http连接升级为websocket连接,之后每个协程都有自己的结构体,
	//结构体中保存当前协程的websocket连接,管道,群集合等,并且将自己的协程对应的
	//结构体bind到全局共享的map中,userid=>node,key=>value的形式,
	//并且每个协程都有自己两个子协程,(1)发送子协程:不断的读取当前这个协程对应的结构体
	//的管道,是否有数据通过管道传过来,如果有,则通过当前的node.Conn发送出去(管道:保证发送消息的顺序性)
	//(2)接收子协程:不断的从node.Conn中读取数据
	//
	conn, err := (&websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool {
			return isvalida
		},
	}).Upgrade(writer, request, nil)

	if (err != nil) {
		log.Println(err.Error()) //打印日志
		return                   //结束
	}

	//todo 获得conn
	node := &Node{
		Conn:      conn,
		DataQueue: make(chan []byte, 50),   //并行的数据转成串行的数据,有缓存
		GroupSets: set.New(set.ThreadSafe), //初始化set,线程安全
	}
	//获取用户关注的全部群id,之后放入这个用户的node.Groupset中,
	//发送群消息的时候,根据群id,遍历所有用户，看看每一个用户是否
	//包含这个群id,有则发送
	comIds := contactService.SearchComunityIds(userId)
	for _, v := range comIds {
		node.GroupSets.Add(v)
	}

	//todo userid和node形成绑定关系,
	//由于这个map操作频率非常大,我们要用锁,保证数据安全
	rwlocker.Lock()
	clientMap[userId] = node
	rwlocker.Unlock()

	log.Printf("%d客户端连接啦", userId)
	//完成发送逻辑
	go sendproc(node)

	//完成接收逻辑
	go recvproc(node, userId)

	//往发送协程的channel(有缓存)写数据
	sendMsg(userId, []byte("hello world"))

}

//检测是否有效
func checkToken(userid int64, token string) bool {
	//从数据库里面查询并比对,todo
	token_resp := userService.Find(userid).Token
	if (token == token_resp) {
		return true
	} else {
		return false
	}
}

//服务端向客户端发送协程
func sendproc(node *Node) {
	for {
		select {
		//一直监听当前这个连接，是否有数据过来，将数据写到node.DataQueue,
		//之后从node.DataQueue管道中读取数据,通过node.Conn发送到客户端
		case data := <-node.DataQueue:
			err := node.Conn.WriteMessage(websocket.TextMessage, data)
			if err != nil {
				//发送出错
				log.Println(err.Error())
				return
			}
		}
	}
}

//接收协程
func recvproc(node *Node, userId int64) {

	for {
		//不断的从websocket连接中读取数据,
		_, data, err := node.Conn.ReadMessage()
		//websocket关闭时,连接回收
		defer node.Conn.Close()
		if err != nil {

			log.Println(err.Error())
			log.Printf("%d客户端websocket连接关闭啦", userId)

			//前端断线清理clientMap中的信息
			rwlocker.Lock()
			delete(clientMap, userId)
			rwlocker.Unlock()

			return
		}
		//对通过websocket穿过的data(json格式的[]byte)做进一步处理
		switch common.G_config.DistributedType {
		case "1":
			//单机推送模式
			dispatch(data)
		case "2":
			//udp推送模式
			//把消息广播到局域网
			broadMsg(data)

		case "3":
			//rabbitmq推送模式
			pushMsg(data)
		}

		fmt.Printf("recv<=%s\n", data)
	}
}

//rabbitmq推送消息,把websocket发过来的消息都通过mq广播出去
func pushMsg(data []byte) {
	mqsendchan <- data
}

//todo 添加新的群ID到用户的groupset中
func AddGroupId(userId, gid int64) {
	//取得node
	rwlocker.Lock()
	node, ok := clientMap[userId]
	if ok {
		node.GroupSets.Add(gid)
	}
	//clientMap[userId] = node
	rwlocker.Unlock()
	//添加gid到set
}

//发送消息
func sendMsg(userId int64, msg []byte) {
	//对于map,需要判断这个值是不是存在的,对于map操作我们要保证并发线程安全
	rwlocker.RLock()
	node, ok := clientMap[userId]
	rwlocker.RUnlock()

	if ok {
		//真,这个值存在
		node.DataQueue <- msg
	}
}

//后端调度逻辑处理(单体架构)
func dispatch(data []byte) {
	//解析data为message
	msg := Message{}

	//将传过来的[]byte数据赋值到结构体中
	err := json.Unmarshal(data, &msg)
	if (err != nil) {
		log.Println(err.Error())
		return
	}
	//根据cmd对逻辑进行处理
	switch msg.Cmd {
	case CMD_SINGLE_MSG:
		//单聊
		sendMsg(msg.Dstid, data)
	case CMD_ROOM_MSG:
		//群聊,遍历所有的clientmap
		for _, v := range clientMap {
			if v.GroupSets.Has(msg.Dstid) {
				v.DataQueue <- data
			}
		}

	case CMD_HEART:
		//todo心跳,保证websocket长连接一直存在,一般啥都不做
	}

}

//分布式:把消息广播到局域网:流程为服务端接收协程收到消息后,
//把消息放到udp的channel中,之后每一台机器启动的时候都会建立一个udp的发送协程,接收协程
//之后这台机器的udp发送协程从udp的channel中读取数据,将数据写入udp,广播出去,别的所有机器的udp接收协程会收到消息,
//之后将这条消息发到指定用户,有的机器有这个用户,有的机器没有这个用户,没有就不发,有就发,因为用户userid是唯一的
//一个用户只会连接到一台服务器上,可以换成rabbitmq则更好一些,确保消息准确到达
func broadMsg(data []byte) {
	udpsendchan <- data
}

//读取配置文件,选择是单机推送,还是分布式udp推送模式,还是分布式rabbitmq推送模式
func InitPush() {
	switch common.G_config.DistributedType {
	case "2":
		//udp广播
		go udpsendproc()
		go udprecvproc()
	case "3":

		//(1)服务启动时候,创建交换机
		rabbitmq_create_exchange()

		//(2)rabbitmq发送协程,一直读取mqsendchan,有消息之后就投递到mq
		go rabbitmq_send()

		//(3)rabbitmq接收协程
		go rabbitmq_recv()
	}
}

//rabbitmq创建交换机
func rabbitmq_create_exchange() {

	rabbitmq_user := common.G_config.Distributed_rabbitmq_user
	rabbitmq_password := common.G_config.Distributed_rabbitmq_password
	rabbitmq_ip := common.G_config.Distributed_rabbitmq_ip
	rabbitmq_port := common.G_config.Distributed_rabbitmq_port

	conn, err := amqp.Dial("amqp://" + rabbitmq_user + ":" + rabbitmq_password + "@" + rabbitmq_ip + ":" + rabbitmq_port + "/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(
		"zhj-exchange", // name
		"fanout",       // type
		true,           // durable
		false,          // auto-deleted
		false,          // internal
		false,          // no-wait
		nil,            // arguments
	)
	failOnError(err, "Failed to declare an exchange")
}

//rabbitmq接收协程
func rabbitmq_recv() {

	rabbitmq_user := common.G_config.Distributed_rabbitmq_user
	rabbitmq_password := common.G_config.Distributed_rabbitmq_password
	rabbitmq_ip := common.G_config.Distributed_rabbitmq_ip
	rabbitmq_port := common.G_config.Distributed_rabbitmq_port

	conn, err := amqp.Dial("amqp://" + rabbitmq_user + ":" + rabbitmq_password + "@" + rabbitmq_ip + ":" + rabbitmq_port + "/")
	defer conn.Close()
	failOnError(err, "连接mq失败")
	log.Println("rabbitmq连接成功!")

	ch, err := conn.Channel()
	failOnError(err, "创建mq-channel失败")
	log.Println("创建mq-channel成功")
	defer ch.Close()

	err = ch.ExchangeDeclare(
		"zhj-exchange", // name
		"fanout",       // type
		true,           // durable
		false,          // auto-deleted
		false,          // internal
		false,          // no-wait
		nil,            // arguments
	)
	failOnError(err, "声明交换机失败")
	log.Println("声明交换机成功")
	q, err := ch.QueueDeclare(
		"zhj-queue-2", // name
		false,         // durable
		false,         // delete when unused
		true,          // exclusive
		false,         // no-wait
		nil,           // arguments
	)
	failOnError(err, "声明队列失败")
	log.Println("声明队列成功")
	err = ch.QueueBind(
		q.Name,         // queue name
		"",             // routing key
		"zhj-exchange", // exchange
		false,
		nil)
	failOnError(err, "bind到交换机失败")
	log.Println("bind到交换机成功")

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "消费mq消息失败")
	log.Println("消费mq消息成功")
	forever := make(chan bool)

	go func() {
		for d := range msgs {
			//接收到别的机器发送来的消息
			log.Printf(" [x] %s", d.Body)
			dispatch(d.Body)

		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever
}

//rabbitmq发送协程
func rabbitmq_send() {

	rabbitmq_user := common.G_config.Distributed_rabbitmq_user
	rabbitmq_password := common.G_config.Distributed_rabbitmq_password
	rabbitmq_ip := common.G_config.Distributed_rabbitmq_ip
	rabbitmq_port := common.G_config.Distributed_rabbitmq_port

	conn, err := amqp.Dial("amqp://" + rabbitmq_user + ":" + rabbitmq_password + "@" + rabbitmq_ip + ":" + rabbitmq_port + "/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(
		"zhj-exchange", // name
		"fanout",       // type
		true,           // durable
		false,          // auto-deleted
		false,          // internal
		false,          // no-wait
		nil,            // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	//rabbitmq发送协程,一直读取mqsendchan,有消息之后就投递到mq
	for {
		select {
		case body := <-mqsendchan:
			//有消息被投递,则将此消息发送至交换机
			err = ch.Publish(
				"zhj-exchange", // exchange
				"",             // routing key
				false,          // mandatory
				false,          // immediate
				amqp.Publishing{
					ContentType: "text/plain",
					Body:        []byte(body),
				})
			failOnError(err, "Failed to publish a message")
			log.Printf(" [x] Sent %s", body)
		}
	}
}

//完成udp数据的发送协程
func udpsendproc() {
	log.Println("start udpsendproc success")
	//使用udp协议拨号(第一个参数:协议,第二个: udp地址(nil本机地址),第三个:能接收到udp消息的地址)//10.33.162.14
	net_IPv4_a, _ := strconv.Atoi(common.G_config.Distributed_net_IPv4_a)
	net_IPv4_b, _ := strconv.Atoi(common.G_config.Distributed_net_IPv4_b)
	net_IPv4_c, _ := strconv.Atoi(common.G_config.Distributed_net_IPv4_c)

	netmask, _ := strconv.Atoi(common.G_config.Distributed_netmask)
	port, _ := strconv.Atoi(common.G_config.Distributed_port)

	// 数据类型需要转换一下
	conn, err := net.DialUDP("udp", nil, &net.UDPAddr{IP: net.IPv4(uint8(net_IPv4_a), uint8(net_IPv4_b), uint8(net_IPv4_c), uint8(netmask)), Port: port,})
	if err != nil {
		log.Println(err.Error())
		return
	}
	defer conn.Close()

	//通过conn发送消息,广播出去
	for {
		select {
		case data := <-udpsendchan:
			_, err := conn.Write(data)
			if err != nil {
				log.Println(err.Error())
				return
			}
		}
	}

}

//完成udp接收协程
func udprecvproc() {
	port, err := strconv.Atoi(common.G_config.Distributed_port)
	log.Println("start udprecvproc success")
	//监听udp广播端口
	conn, err := net.ListenUDP("udp", &net.UDPAddr{
		IP:   net.IPv4zero,
		Port: port,
	})
	defer conn.Close()
	if err != nil {
		log.Println(err.Error())
		return
	}

	//处理端口发送过来的消息
	for {
		var buf [512]byte

		n, err := conn.Read(buf[0:])
		if err != nil {
			log.Println(err.Error())
			return
		}

		//直接通过udp传送过来的数据
		dispatch(buf[0:n])
	}
}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

